package io.delta.flink.source;

import java.util.Arrays;
import java.util.List;

import io.delta.flink.source.internal.builder.BoundedDeltaSourceBuilder;
import io.delta.flink.source.internal.builder.DeltaBulkFormat;
import io.delta.flink.source.internal.builder.RowDataFormat;
import io.delta.flink.source.internal.enumerator.supplier.BoundedSnapshotSupplierFactory;
import io.delta.flink.source.internal.utils.SourceSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import static io.delta.flink.source.internal.DeltaSourceOptions.LOADED_SCHEMA_SNAPSHOT_VERSION;

/**
 * A builder class for {@link DeltaSource} for a stream of {@link RowData} where the created source
 * instance will operate in Bounded mode.
 * <p>
 * For most common use cases use {@link DeltaSource#forBoundedRowData} utility method to instantiate
 * the source. After instantiation of this builder you can either call {@link
 * RowDataBoundedDeltaSourceBuilder#build()} method to get the instance of a {@link DeltaSource} or
 * configure additional options using builder's API.
 */
public class RowDataBoundedDeltaSourceBuilder
    extends BoundedDeltaSourceBuilder<RowData, RowDataBoundedDeltaSourceBuilder> {

    RowDataBoundedDeltaSourceBuilder(
            Path tablePath,
            Configuration hadoopConfiguration,
            BoundedSnapshotSupplierFactory snapshotSupplierFactory) {
        super(tablePath, hadoopConfiguration, snapshotSupplierFactory);
    }

    //////////////////////////////////////////////////////////
    ///     We have to override methods from base class    ///
    /// to include them in javadoc generated by sbt-unidoc ///
    //////////////////////////////////////////////////////////

    /**
     * Specifies a {@link List} of column names that should be read from Delta table. If this method
     * is not used, Source will read all columns from Delta table.
     * <p>
     * If provided List is null or contains null, empty or blank elements it will throw a
     * {@code DeltaSourceValidationException} by builder after calling {@code build()} method.
     *
     * @param columnNames column names that should be read.
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder columnNames(List<String> columnNames) {
        return super.columnNames(columnNames);
    }

    /**
     * Specifies an array of column names that should be read from Delta table. If this method
     * is not used, Source will read all columns from Delta table.
     * <p>
     * If provided List is null or contains null, empty or blank elements it will throw a
     * {@code DeltaSourceValidationException} by builder after calling {@code build()} method.
     *
     * @param columnNames column names that should be read.
     */
    public RowDataBoundedDeltaSourceBuilder columnNames(String... columnNames) {
        return super.columnNames(Arrays.asList(columnNames));
    }

    /**
     * Sets value of "versionAsOf" option. With this option we will load the given table version and
     * read from it.
     *
     * <p>
     * This option is mutually exclusive with {@link #timestampAsOf(String)} option.
     *
     * @param snapshotVersion Delta table version to time travel to.
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder versionAsOf(long snapshotVersion) {
        return super.versionAsOf(snapshotVersion);
    }

    /**
     * Sets value of "timestampAsOf" option. With this option we will load the latest table version
     * that was generated at or before the given timestamp.
     * <p>
     * This option is mutually exclusive with {@link #versionAsOf(long)} option.
     *
     * @param snapshotTimestamp The timestamp we should time travel to. Supported formats are:
     *                          <ul>
     *                                <li>2022-02-24</li>
     *                                <li>2022-02-24 04:55:00</li>
     *                                <li>2022-02-24 04:55:00.001</li>
     *                                <li>2022-02-24T04:55:00</li>
     *                                <li>2022-02-24T04:55:00.001</li>
     *                                <li>2022-02-24T04:55:00.001Z</li>
     *                           </ul>
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder timestampAsOf(String snapshotTimestamp) {
        return super.timestampAsOf(snapshotTimestamp);
    }

    /**
     * Sets a configuration option.
     *
     * @param optionName  Option name to set.
     * @param optionValue Option {@link String} value to set.
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder option(String optionName, String optionValue) {
        return super.option(optionName, optionValue);
    }

    /**
     * Sets a configuration option.
     *
     * @param optionName  Option name to set.
     * @param optionValue Option boolean value to set.
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder option(String optionName, boolean optionValue) {
        return super.option(optionName, optionValue);
    }

    /**
     * Sets a configuration option.
     *
     * @param optionName  Option name to set.
     * @param optionValue Option int value to set.
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder option(String optionName, int optionValue) {
        return super.option(optionName, optionValue);
    }

    /**
     * Sets a configuration option.
     *
     * @param optionName  Option name to set.
     * @param optionValue Option long value to set.
     */
    @Override
    public RowDataBoundedDeltaSourceBuilder option(String optionName, long optionValue) {
        return super.option(optionName, optionValue);
    }

    /**
     * Creates an instance of {@link DeltaSource} for a stream of {@link RowData}. Created source
     * will work in Bounded mode, meaning it will read the content of the configured Delta snapshot
     * at the fixed version, ignoring all changes done to this table after starting this source.
     *
     * <p>
     * This method can throw {@code DeltaSourceValidationException} in case of invalid arguments
     * passed to Delta source builder.
     *
     * @return New {@link DeltaSource} instance.
     */
    @Override
    @SuppressWarnings("unchecked")
    public DeltaSource<RowData> build() {

        validate();

        // In this step, the Delta table schema discovery is made.
        // We load the snapshot corresponding to the latest/versionAsOf/timestampAsOf commit.
        // We are using this snapshot to extract the metadata and discover table's column names
        // and data types.
        SourceSchema sourceSchema = getSourceSchema();
        sourceConfiguration.addOption(
            LOADED_SCHEMA_SNAPSHOT_VERSION,
            sourceSchema.getSnapshotVersion()
        );

        DeltaBulkFormat<RowData> format = RowDataFormat.builder(
                RowType.of(sourceSchema.getColumnTypes(), sourceSchema.getColumnNames()),
                hadoopConfiguration)
            .partitionColumns(sourceSchema.getPartitionColumns())
            .build();

        return new DeltaSource<>(
            tablePath,
            format,
            DEFAULT_BOUNDED_SPLIT_ENUMERATOR_PROVIDER,
            hadoopConfiguration,
            sourceConfiguration);
    }
}
